Doris迁移至AnalyticDB PostgreSQL版

更新时间:

本文介绍如何将Doris数据迁移至云原生数据仓库AnalyticDB PostgreSQL

准备工作

操作步骤

步骤一:创建用于装载数据的目标表

云原生数据仓库AnalyticDB PostgreSQL实例中创建用于装载Doris数据的目标表。目标表结构需与源表结构对应。建表语法,请参见建表语句

步骤二:将Doris的数据导入到OSS

Doris支持使用S3协议将数据导出至对象存储,OSS同样支持S3协议。您可以将数据直接上传至OSS,但针对不同的Doris版本其支持的导出能力也有所不同:

  • Doris2.0及以上版本支持使用EXPORT TABLE语句导出数据,导出格式为csv、text、orcparquet。其中以parquet格式导出的数据与主流parquet格式并不兼容,并且使用csvtext格式不支持导出数据中含有换行符等特殊字符,因此更推荐使用orc格式,该格式导出速度较快。

  • Doris1.2版本的EXPORT TABLE语句仅支持导出格式为csv,导出格式较为单一,不推荐使用。推荐使用SELECT INTO OUTFILE语句,导出格式为orc,速度相比EXPORT TABLE语句稍慢一些,但支持使用WHERE子句进行过滤。

使用示例

Doris2.0Doris1.2版本的导出语句如下:

------ Doris 2.0
EXPORT TABLE s3_test TO "s3://bucket/dir/" 
PROPERTIES (
  "format"="orc"
)
WITH s3 (
    "AWS_ENDPOINT" = "oss-cn-shanghai-internal.aliyuncs.com",
    "AWS_ACCESS_KEY" = "LTA****",
    "AWS_SECRET_KEY" = "lQEI1TSJIY0******",
    "AWS_REGION" = "shanghai"
)
---- Doris 1.2 c列 为datetime格式
SELECT a, b, CAST(c AS string) AS c FROM s3_test INTO OUTFILE "s3://bucket/dir/"
FORMAT AS orc
PROPERTIES
(
    "AWS_ENDPOINT" = "oss-cn-shanghai-internal.aliyuncs.com",
    "AWS_ACCESS_KEY" = "LTA****",
    "AWS_SECRET_KEY" = "lQEI1TSJIY0******",
    "AWS_REGION" = "shanghai"
);
说明

如果表中含有DATETIME类型的列,则只能使用SELECT INTO FILE语句。因为DorisDATETIME类型导出的格式与主流产品不兼容,需要转换为STRING类型。

步骤三:将OSS数据导入到AnalyticDB PostgreSQL

您可以通过COPY命令或使用OSS外表将数据导入云原生数据仓库AnalyticDB PostgreSQL

使用示例

使用COPY命令导入OSS的语句如下:

COPY test1 FROM 'oss://bucket/dir/' ACCESS_KEY_ID 'LTAI5t********' SECRET_ACCESS_KEY 'lQEI1T*******' 
FORMAT AS orc ENDPOINT 'oss-*****-
internal.aliyuncs.com' FDW 'oss_fdw' ;

语法转换

数据类型

Doris

AnalyticDB PostgreSQL

备注

BOOLEAN

BOOLEAN

TINYINT

SMALLINT

云原生数据仓库AnalyticDB PostgreSQL没有TINYINT。

SMALLINT

SMALLINT

INT

INT

BIGINT

BIGINT

LARGEINT

DECIMAL

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL

DECIMAL

DATE

DATE

DATETIME

TIMESTAMP/TIMSTAMPTZ

CHAR

CHAR

VARCHAR

VARCHAR

STRING

TEXT

HLL

/

  • HLL是模糊去重,在数据量大的情况性能优于Count Distinct。HLL的误差通常在1%左右,有时会达到2%。HLL不能作为Key列使用,建表时配合聚合类型为HLL_UNION。

  • 用户不需要指定长度和默认值。长度根据数据的聚合程度系统内控制。HLL列只能通过配套的hll_union_agg、hll_raw_agg、hll_cardinality、hll_hash进行查询或使用。

BITMAP

/

BITMAP类型的列可以在Aggregate表或Unique表中使用。

QUANTILE_STATE

/

ARRAY

[ ]

MAP

自定义复合类型

STRUCT

自定义复合类型

JSON

JSON

AGG_STATE

/

VARIANT

自定义复合类型

建表语句

以下为几种常见的建表语句模型。

模型一:明细模型

明细模型没有主键和聚合列限制,在建表语句中指定的DUPLICATE KEY,是用来指明底层数据按照哪些列进行排序。在云原生数据仓库AnalyticDB PostgreSQL中对应转换为AOCSBEAM表,并使用ORDER BY语句指定排序键,启动AUTOMERGE可以定期进行数据自动排序。

使用示例
CREATE TABLE IF NOT EXISTS example_tbl_by_default
(
    `timestamp` DATETIME NOT NULL COMMENT "日志时间",
    `type` INT NOT NULL COMMENT "日志类型",
    `error_code` INT COMMENT "错误码",
    `error_msg` VARCHAR(1024) COMMENT "错误详细信息",
    `op_id` BIGINT COMMENT "负责人id",
    `op_time` DATETIME COMMENT "处理时间"
)
DUPLICATE KEY (`timestamp`,`type`,`error_code`)
DISTRIBUTED BY HASH(`type`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);

----AnalyticDB PostgreSQL
CREATE TABLE IF NOT EXISTS example_tbl_by_default
(
    "timestamp" TIMESTAMP NOT NULL ,
    "type" INT NOT NULL ,
    error_code INT ,
    error_msg VARCHAR(1024),
    op_id BIGINT,
    op_time TIMESTAMP
)
WITH(appendonly = true, orientation = column)
DISTRIBUTED BY("type")
ORDER BY("timestamp","type",error_code);

COMMENT ON COLUMN example_tbl_by_default.timestamp IS '日志时间';

模型二:主键模型

主键模型的主要目的是为了确保数据主键的唯一性,使用UNIQUE KEY来指定唯一性约束,在云原生数据仓库AnalyticDB PostgreSQL中对应heap表,并使用PRIMARY KEY来指定唯一键。

使用示例
CREATE TABLE IF NOT EXISTS example_tbl_unique
(
    `user_id` LARGEINT NOT NULL COMMENT "用户id",
    `username` VARCHAR(50) NOT NULL COMMENT "用户昵称",
    `city` VARCHAR(20) COMMENT "用户所在城市",
    `age` SMALLINT COMMENT "用户年龄",
    `sex` TINYINT COMMENT "用户性别",
    `phone` LARGEINT COMMENT "用户电话",
    `address` VARCHAR(500) COMMENT "用户地址",
    `register_time` DATETIME COMMENT "用户注册时间"
)
UNIQUE KEY(`user_id`, `username`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);

----AnalyticDB PostgreSQL
CREATE TABLE IF NOT EXISTS example_tbl_unique
(
    user_id BIGINT NOT NULL,
    username VARCHAR(50) NOT NULL,
    city VARCHAR(20),
    age SMALLINT,
    sex SMALLINT,
    phone BIGINT,
    address VARCHAR(500),
    register_time TIMESTAMP,
    PRIMARY KEY (user_id, username)
)
DISTRIBUTED BY (user_id);

COMMENT ON COLUMN example_tbl_unique.user_id IS '用户id';

模型三:聚合模型

聚合模型导入数据时,会将Aggregate Key列相同的行聚合成一行,而将Value列按照设置的 AggregationType进行聚合。在云原生数据仓库AnalyticDB PostgreSQL中对应使用heap表 ,对于Aggregate Key创建唯一索引,同时插入数据时使用UPSERT方式进行。具体内容,请参见使用INSERT ON CONFLICT覆盖写入数据

使用示例
CREATE TABLE IF NOT EXISTS example_tbl_agg1
(
    `user_id` LARGEINT NOT NULL COMMENT "用户id",
    `date` DATE NOT NULL COMMENT "数据灌入日期时间",
    `city` VARCHAR(20) COMMENT "用户所在城市",
    `age` SMALLINT COMMENT "用户年龄",
    `sex` TINYINT COMMENT "用户性别",
    `last_visit_date` DATETIME REPLACE DEFAULT "1970-01-01 00:00:00" COMMENT "用户最后一次访问时间",
    `cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费",
    `max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间",
    `min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用户最小停留时间"
)
AGGREGATE KEY(`user_id`, `date`, `city`, `age`, `sex`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);

-----AnalyticDB PostgreSQL  不支持自动预聚合
CREATE TABLE IF NOT EXISTS example_tbl_agg1
(
    user_id BIGINT NOT NULL,
    "date" DATE NOT NULL,
    city VARCHAR(20),
    age SMALLINT,
    sex SMALLINT,
    last_visit_date TIMESTAMP DEFAULT '1970-01-01 00:00:00',
    cost BIGINT DEFAULT 0,
    max_dwell_time INT DEFAULT 0,
    min_dwell_time INT DEFAULT 99999,
    UNIQUE (user_id, "date", city, age, sex)
)
DISTRIBUTED BY(user_id);

INSERT INTO example_tbl_agg1 VALUES (10000,'2024-08-22','beijing', 18, 0, '2024-08-22 12:00:00', 20, 1000, 1000) ON CONFLICT (user_id, "date", city, age, sex) DO UPDATE SET last_visit_date = excluded.last_visit_date, cost = example_tbl_agg1.cost + excluded.cost, max_dwell_time = GREATEST(example_tbl_agg1.max_dwell_time, excluded.max_dwell_time), min_dwell_time = LEAST(example_tbl_agg1.min_dwell_time, excluded.min_dwell_time);

分区分桶

Doris通过PARTITION BY进行分区,DISTRIBUTED BY进行分桶,使用BUCKETS指定分桶数量,在云原生数据仓库AnalyticDB PostgreSQL中对应分区键PARTITION BY和分布键DISTRIBUTED BY。

使用示例
CREATE TABLE IF NOT EXISTS example_range_tbl
(
    `user_id` LARGEINT NOT NULL COMMENT "用户id",
    `date` DATE NOT NULL COMMENT "数据灌入日期时间",
    `timestamp` DATETIME NOT NULL COMMENT "数据灌入的时间戳",
    `city` VARCHAR(20) COMMENT "用户所在城市",
    `age` SMALLINT COMMENT "用户年龄",
    `sex` TINYINT COMMENT "用户性别",
    `last_visit_date` DATETIME REPLACE DEFAULT "1970-01-01 00:00:00" COMMENT "用户最后一次访问时间",
    `cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费",
    `max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间",
    `min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用户最小停留时间"
)
ENGINE=OLAP
AGGREGATE KEY(`user_id`, `date`, `timestamp`, `city`, `age`, `sex`)
PARTITION BY RANGE(`date`)
(
    PARTITION `p201701` VALUES LESS THAN ("2017-02-01"),
    PARTITION `p201702` VALUES LESS THAN ("2017-03-01"),
    PARTITION `p201703` VALUES LESS THAN ("2017-04-01"),
    PARTITION `p2018` VALUES [("2018-01-01"), ("2019-01-01"))
)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 16
PROPERTIES
(
    "replication_num" = "1"
);

----AnalyticDB PostgreSQL
CREATE TABLE IF NOT EXISTS example_range_tbl
(
    user_id BIGINT NOT NULL,
    "date" DATE NOT NULL,
    city VARCHAR(20),
    age SMALLINT,
    sex SMALLINT,
    visit_date TIMESTAMP DEFAULT '1970-01-01 00:00:00',
    a_cost BIGINT DEFAULT 0,
    dwell_time INT DEFAULT 0
)
PARTITION BY RANGE("date")
(
    PARTITION p201701 VALUES START ("2017-02-01") INCLUSIVE,
    PARTITION p201702 VALUES START ("2017-03-01") INCLUSIVE,
    PARTITION p201703 VALUES START ("2017-04-01") INCLUSIVE,
    PARTITION p2018 VALUES START ("2018-01-01") INCLUSIVE END ("2019-01-01") EXCLUSIVE
)
DISTRIBUTED BY (user_id);